跳到主要内容

搭建 Kafka 环境

配置环境

直接使用 Docker 搭建环境,新建 docker-compose.yml 文件,参考 kafka-stack-docker-compose 的单 Kafka 配置

version: '3.8'
volumes:
zksingle-ksingle_kafka1-data:
zksingle-ksingle_zoo1-data:
zksingle-ksingle_zoo1-log:

services:
zoo1:
image: confluentinc/cp-zookeeper:6.2.1
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
volumes:
- type: volume
source: zksingle-ksingle_zoo1-data
target: /data
- type: volume
source: zksingle-ksingle_zoo1-log
target: /datalog

kafka1:
image: confluentinc/cp-kafka:6.2.1
hostname: kafka1
user: "appuser:appuser"
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
volumes:
- type: volume
source: zksingle-ksingle_kafka1-data
target: /var/lib/kafka/data
depends_on:
- zoo1

A窗口 启动服务之后查看/发送消息到 web_log 里

# 发送消息
$ /bin/kafka-console-producer --topic=web_log --broker-list kafka1:9092

如下图所示,发送了四条数据

# 查看已经建好的 topic
$ /bin/kafka-topics --list --zookeeper zoo1:2181

可以看到创建了一个 web_log topic

B窗口接收消息 web_log 里信息

# 接收消息
/bin/kafka-console-consumer --bootstrap-server kafka1:9092 --from-beginning --topic web_log

Go 操作 kafka

Kafka 客户端

这里使用 Apache Kafka 的 Golang 客户端库 Sarama

go get -u github.com/Shopify/sarama

官方文档

producer 生产者

如下代码所示:

package main

import (
"fmt"

"github.com/Shopify/sarama"
)

var Topic = "web_log" //主题名称

// 基于 sarama 第三方库开发的 kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要 leader 和 follow 都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个 partition
config.Producer.Return.Successes = true // 成功交付的消息将在 success channel 返回(回复确认)

// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}

defer client.Close()

//例子一发单个消息
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = Topic
content := "this is a test log"
send01(client, msg, content)

//例子二发多个消息
for _, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
send01(client, msg, word)
}
}

//发消息
func send01(client sarama.SyncProducer, msg *sarama.ProducerMessage, content string) {
msg.Value = sarama.StringEncoder(content)

// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}

fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

查看文件

cd /var/lib/kafka/data

#可以看到 web_log-0 的文件夹,上面的 topic 就是 web_log
ls -l

#查看文件夹
ls web_log-0/ -l

没错,partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含 .index 文件、.log 文件、.timeindex 文件是哪个文件,其中 .log 文件就是实际存储 message 的地方,而 .index.timeindex 文件为索引文件,用于检索消息。

consumer 消费者

package main

import (
"fmt"

"github.com/Shopify/sarama"
)

// kafka consumer

var Topic = "web_log" //主题名称

func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}

partitionList, err := consumer.Partitions(Topic) // 根据 topic 取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}

defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)

}
select {} //阻塞进程
}

消费成功

注意上面的 sarama.OffsetOldest

问题描述:kafka 重复消费的问题,每次重启服务,都从 broker 拉取 offset,拉到旧的消息

解决:消费者每次从 broker 拉取数据,消费之后都会自动 commit,自动 commit 有一个定时机制,但是并不保证一定提交成功,例如在 commit 的时候,程序崩溃(例如重启的情况在重启后,消费者会拉取最新的 offset 作为消费的起点,sarama 默认的 initialOffset 为 OffsetNewest,这个参数表示,每次从 broker 拉取最新的消息消费,而 OffsetOldest 则会拉取最旧的消息进行消费,而我们的业务代码中配置的是拉取最旧的消息,并且没有做幂等处理,所以导致重复消息消费。

结合 gin 框架操作 kafka

package main

import (
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
)

var Topic = "web_log" //主题名称
var kafkaIp = "127.0.0.1:9092"

func Test(ctx *gin.Context) {
//读取
ctx.JSON(200, gin.H{
"data": "product",
})
}

func main() {
//启动消息者
go InitConsumer()

r := gin.Default()
r.GET("/ping", func(c *gin.Context) {
c.JSON(200, gin.H{
"message": "pong",
})
})
r.GET("/send", SendMessage) //http://localhost:8082/send
r.Run("0.0.0.0:8082") // 监听并在 0.0.0.0:8080 上启动服务

}

//发消息到 kakfa
func SendMessage(ctx *gin.Context) {
fmt.Println("SendMessage")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

// 连接kafka
client, err := sarama.NewSyncProducer([]string{kafkaIp}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
//例子一发单个消息
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = Topic
content := "this is a test log"
sendTokafka(client, msg, content)

//例子二发多个消息
for _, word := range []string{"Welcome11", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
sendTokafka(client, msg, word)
}
}

//发消息函数
func sendTokafka(client sarama.SyncProducer, msg *sarama.ProducerMessage, content string) {
msg.Value = sarama.StringEncoder(content)

// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

// 初始化消费者
func InitConsumer() {
time.Sleep(time.Second * 3)
fmt.Println("init Counsumer success")

var wg sync.WaitGroup
consumer, err := sarama.NewConsumer([]string{kafkaIp}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions(Topic) // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}

fmt.Println(partitionList)

for partition := range partitionList { // 遍历所有的分区
wg.Add(1)
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}

// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
defer pc.AsyncClose()
wg.Done()
}(pc)
}
//select{} //阻塞进程
wg.Wait()
consumer.Close()
}

配置图形界面

## Docker/Docker-Compose 安装 Kafka https://juejin.cn/post/7091842457318473764

version: '3.5'
services:
zookeeper:
# restart: always
image: wurstmeister/zookeeper ## 镜像
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181" ## 对外暴露的端口号
kafka:
# restart: always
image: wurstmeister/kafka ## 镜像
container_name: kafka
volumes:
- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: 120
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000
kafka-manager:
# restart: always
image: sheepkiller/kafka-manager ## 镜像:开源的 web 管理 kafka 集群的界面
container_name: kafka-manager
environment:
ZK_HOSTS: "zookeeper:2181"
ports:
- "9009:9000" ## 暴露端口 9000 这个端口冲突太多
links:
- zookeeper
- kafka

20221124162847

Reference

Kafka导致重复消费原因和解决方案 【Golang】kafka使用之路(三)go语言操作kafka